package com.cloudansys.core.util;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.concurrent.TimeUnit;

public class FlinkUtil {

    public static StreamExecutionEnvironment getFlinkEnv() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 关闭日志打印
//        env.getConfig().disableSysoutLogging();

        // 启动检查点，设置每 5000 ms 进行一次 Checkpoint 操作
        env.enableCheckpointing(5000);

        // 设置重启策略：控制 job 执行失败后如何重启
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, org.apache.flink.api.common.time.Time.of(100, TimeUnit.MILLISECONDS)));

        // 设置精准一次（默认）
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // 确保两次的 checkpoint 时间间隔至少是 500 ms
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        // CheckPoint 的超时时间，checkpoints 必须在1分钟内完成，否则丢弃
        env.getCheckpointConfig().setCheckpointTimeout(60000);

        // 设置如果任务取消，系统该如何处理检查点数据
        // RETAIN_ON_CANCELLATION：如果取消任务的时候，没有加 --savepoint，系统会保留检查点数据
        // DELETE_ON_CANCELLATION：取消任务，自动是删除检查点（不建议使用）
        env
                .getCheckpointConfig()
                .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 当有较新的 Savepoint 时，作业也会从 Checkpoint 处恢复
        env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

        // 允许检查点失败一次
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);

        // 一次只至允许一个 checkpoint 执行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        //设置使用 EventTime，默认使用 ProcessTime
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        return env;
    }

}
